Kafka 生产者 API 详解与实践
Kafka 生产者(Producer)是消息的发送端,负责将业务数据发送到 Kafka 集群。通过 Kafka 提供的 Producer API,我们可以灵活配置生产者行为,支持同步 / 异步发送、自定义分区策略、消息压缩等功能。本文将详细介绍生产者的实现步骤、核心配置及不同发送方式的代码示例
生产者核心概念与配置
核心配置参数
创建生产者时,需通过 Properties 对象配置关键参数,其中必选参数有三个:
| 参数名 |
作用 |
示例值 |
bootstrap.servers |
指定 Kafka 集群地址(多个用逗号分隔) |
localhost:9092 |
key.serializer |
消息键(Key)的序列化类(需实现 Serializer 接口) |
org.apache.kafka.common.serialization.StringSerializer |
value.serializer |
消息值(Value)的序列化类 |
org.apache.kafka.common.serialization.StringSerializer |
常用可选参数(参考 ProducerConfig 类):
| 参数名 |
作用 |
默认值 |
acks |
消息确认级别(0:不确认;1:Leader 确认;-1/all:Leader + 所有 ISR 副本确认) |
1 |
retries |
发送失败后的重试次数 |
0 |
batch.size |
批次大小(达到该值后批量发送,单位:字节) |
16384(16KB) |
linger.ms |
批处理等待时间(若未达 batch.size,超时后也会发送) |
0(立即发送) |
buffer.memory |
发送缓冲区大小(消息暂存此处等待发送) |
33554432(32MB) |
compression.type |
消息压缩算法(none/gzip/snappy/lz4) |
none |
生产者工作原理
Kafka 生产者发送消息的流程如下:
- 消息被封装为
ProducerRecord(包含主题、键、值、分区等信息)。
- 消息经序列化后进入发送缓冲区(
RecordAccumulator),按分区聚合为批次(RecordBatch)。
- 后台
Sender 线程从缓冲区拉取批次,发送到目标 Broker 的对应分区。
- 收到 Broker 确认后,触发回调(若配置)或返回结果。
生产者实现方式
单线程生产者
单线程生产者适用于消息量不大的场景,实现步骤如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
| import org.apache.kafka.clients.producer.*; import java.util.Properties; import java.util.concurrent.ExecutionException;
public class KafkaSingleThreadProducer { private static final int MSG_COUNT = 5; private static final String TOPIC_NAME = "test-topic"; private static KafkaProducer<String, String> producer;
static { Properties props = initConfig(); producer = new KafkaProducer<>(props); }
private static Properties initConfig() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 3); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 100); return props; }
public static void sendSync() { try { for (int i = 0; i < MSG_COUNT; i++) { ProducerRecord<String, String> record = new ProducerRecord<>( TOPIC_NAME, "sync-key-" + i, "sync-value-" + i ); RecordMetadata metadata = producer.send(record).get(); System.out.printf("同步发送成功 - 分区:%d,偏移量:%d%n", metadata.partition(), metadata.offset()); Thread.sleep(500); } } catch (InterruptedException | ExecutionException e) { System.err.println("同步发送失败:" + e.getMessage()); } finally { producer.close(); } }
public static void sendAsync() { try { for (int i = 0; i < MSG_COUNT; i++) { ProducerRecord<String, String> record = new ProducerRecord<>( TOPIC_NAME, "async-key-" + i, "async-value-" + i ); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { System.out.printf("异步发送成功 - 分区:%d,偏移量:%d%n", metadata.partition(), metadata.offset()); } else { System.err.println("异步发送失败:" + exception.getMessage()); } } }); Thread.sleep(500); } } catch (InterruptedException e) { System.err.println("线程中断:" + e.getMessage()); } finally { producer.close(); } }
public static void main(String[] args) { System.out.println("===== 同步发送 ====="); sendSync(); System.out.println("\n===== 异步发送 ====="); sendAsync(); } }
|
关键区别:
- 同步发送:通过
producer.send(record).get() 阻塞等待结果,确保消息发送状态,但吞吐量较低。
- 异步发送:通过
Callback 回调非阻塞处理结果,吞吐量更高,适合高并发场景。
多线程生产者
当消息量较大时,可使用多线程生产者提高发送效率。注意:KafkaProducer 是线程安全的,可被多个线程共享,无需为每个线程创建单独实例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| import org.apache.kafka.clients.producer.*; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
public class KafkaMultiThreadProducer { private static final int MSG_COUNT = 10; private static final String TOPIC_NAME = "test-topic"; private static KafkaProducer<String, String> producer;
static { Properties props = initConfig(); producer = new KafkaProducer<>(props); }
private static Properties initConfig() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.ACKS_CONFIG, "1"); props.put(ProducerConfig.RETRIES_CONFIG, 2); props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); return props; }
static class ProducerTask implements Runnable { private ProducerRecord<String, String> record;
public ProducerTask(ProducerRecord<String, String> record) { this.record = record; }
@Override public void run() { producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { System.err.println("线程 " + Thread.currentThread().getId() + " 发送失败:" + exception.getMessage()); } else { System.out.printf("线程 %d 发送成功 - 分区:%d,偏移量:%d%n", Thread.currentThread().getId(), metadata.partition(), metadata.offset()); } } }); } }
public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(4); try { for (int i = 0; i < MSG_COUNT; i++) { ProducerRecord<String, String> record = new ProducerRecord<>( TOPIC_NAME, "multi-key-" + i, "multi-value-" + i ); executor.submit(new ProducerTask(record)); } } finally { executor.shutdown(); producer.close(); } } }
|
优势:
- 利用线程池并行处理发送任务,提高吞吐量。
- 共享单个
KafkaProducer 实例,减少资源消耗(避免重复创建连接)。
自定义分区策略
Kafka 默认根据消息键(Key)的哈希值分配分区,若需自定义分区规则(如按业务类型分配),可实现 Partitioner 接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map;
public class CustomPartitioner implements Partitioner {
@Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { int numPartitions = cluster.partitionsForTopic(topic).size(); String valueStr = value.toString(); if (valueStr.contains("order")) { return 0 % numPartitions; } else if (valueStr.contains("log")) { return 1 % numPartitions; } else { return (key == null) ? 0 : Math.abs(key.hashCode() % numPartitions); } }
@Override public void close() { }
@Override public void configure(Map<String, ?> configs) { } }
|
使用自定义分区器:
在生产者配置中添加:
1
| props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.example.CustomPartitioner");
|
最佳实践
- 选择合适的发送方式:
- 同步发送:适用于对消息可靠性要求高、吞吐量要求低的场景(如金融交易)。
- 异步发送:适用于高吞吐量场景(如日志收集),通过回调处理异常。
- 优化性能:
- 启用批量发送(
batch.size + linger.ms)和压缩(compression.type)。
- 合理设置缓冲区大小(
buffer.memory),避免缓冲区满导致阻塞。
- 确保可靠性:
- 关键业务设置
acks=all + 适当的重试次数(retries)。
- 配合回调函数处理发送失败,实现消息重试或落库补偿。
- 线程安全:
KafkaProducer 是线程安全的,多线程场景下应共享实例而非创建多个